-- SET sql-client.execution.result-mode=TABLEAU;
-- 执行命令： sql-client.sh -f flink_kafka2mysql.q

CREATE TABLE tbl_host_zzdetail (
  content STRING,
  content2 STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hllxd101:53149/db58_zp_bi_dev',
  'username' = 'zpbi_wr',
  'password' = '39086120578ddd85',
  'table-name' = 'tbl_host_zzdetail'
);

CREATE TABLE hdp_lbg_supin_ods_tsar_detail2 (
  content STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'host_zzdetail2',
  'properties.bootstrap.servers' = 'node101:9092',
--  'scan.startup.mode' = 'group-offsets',
  'scan.startup.mode' = 'earliest-offset',
  'sink.parallelism' = '1',
  'format' = 'raw'
);

CREATE TABLE hdp_hism_idea_dwd_host_zzdetail (
  `log_ts` string COMMENT '数据接收时间',
  `trace_id` string COMMENT '传输traceId',
  `package_len` bigint COMMENT '数据record接收数据长度',
  `package_len2` bigint COMMENT '数据解析后实际长度',
  `item_json` string COMMENT '数据内容'
) WITH (
  'connector' = 'kafka',
  'topic' = 'host_zzdetail2',
  'properties.bootstrap.servers' = 'node101:9092',
  'scan.startup.mode' = 'earliest-offset',
  'sink.parallelism' = '1',
  'format'='json'
);

SELECT
  log_ts,
  json_value(item_json, '$.ts') AS ts,
  json_value(item_json, '$.type') AS type,
  json_value(item_json, '$.host') AS host,
  json_value(item_json, '$.mod') AS item_mod,
  json_value(item_json, '$.value') AS item_val
FROM hdp_hism_idea_dwd_host_zzdetail




"{
\"ts\":\"20240714204601\",
\"type\":\"disk\",
\"host\":\"hllxd.website.com\",
\"mod\":\"partition:/var/lib/docker/overlay2:btotl\",
\"value\":\"42140479488.0\"
}"


INSERT INTO tbl_hism_realtime_host_zzdetail
SELECT
  0,
  cast(PROCTIME() as string) as proc_time,
  log_ts,
  json_value(item_json, '$.ts') AS ts,
  json_value(item_json, '$.type') AS type,
  json_value(item_json, '$.host') AS host_id,
  json_value(item_json, '$.mod') AS item_mod,
  json_value(item_json, '$.value') AS item_val
FROM hdp_hism_idea_dwd_host_zzdetail
